//
//  ========================================================================
//  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
//  ------------------------------------------------------------------------
//  All rights reserved. This program and the accompanying materials
//  are made available under the terms of the Eclipse Public License v1.0
//  and Apache License v2.0 which accompanies this distribution.
//
//      The Eclipse Public License is available at
//      http://www.eclipse.org/legal/epl-v10.html
//
//      The Apache License v2.0 is available at
//      http://www.opensource.org/licenses/apache2.0.php
//
//  You may elect to redistribute this code under either of these licenses.
//  ========================================================================
//

package org.eclipse.jetty.io;

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;

import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.NonBlockingThread;

/**
 * <p>A convenience base implementation of {@link Connection}.</p>
 * <p>This class uses the capabilities of the {@link EndPoint} API to provide a
 * more traditional style of async reading.  A call to {@link #fillInterested()}
 * will schedule a callback to {@link #onFillable()} or {@link #onFillInterestedFailed(Throwable)}
 * as appropriate.</p>
 */
public abstract class AbstractConnection implements Connection
{
    private static final Logger LOG = Log.getLogger(AbstractConnection.class);
    
    public static final boolean EXECUTE_ONFILLABLE=true;

    private final List<Listener> listeners = new CopyOnWriteArrayList<>();
    private final AtomicReference<State> _state = new AtomicReference<>(IDLE);
    private final long _created=System.currentTimeMillis();
    private final EndPoint _endPoint;
    private final Executor _executor;
    private final Callback _readCallback;
    private final boolean _executeOnfillable;
    private int _inputBufferSize=2048;

    protected AbstractConnection(EndPoint endp, Executor executor)
    {
        this(endp,executor,EXECUTE_ONFILLABLE);
    }
    
    protected AbstractConnection(EndPoint endp, Executor executor, final boolean executeOnfillable)
    {
        if (executor == null)
            throw new IllegalArgumentException("Executor must not be null!");
        _endPoint = endp;
        _executor = executor;
        _readCallback = new ReadCallback();
        _executeOnfillable=executeOnfillable;
        _state.set(IDLE);
    }

    @Override
    public void addListener(Listener listener)
    {
        listeners.add(listener);
    }

    public int getInputBufferSize()
    {
        return _inputBufferSize;
    }

    public void setInputBufferSize(int inputBufferSize)
    {
        _inputBufferSize = inputBufferSize;
    }

    protected Executor getExecutor()
    {
        return _executor;
    }
    
    protected void failedCallback(final Callback callback, final Throwable x)
    {
        if (NonBlockingThread.isNonBlockingThread())
        {
            try
            {
                getExecutor().execute(new Runnable()
                {
                    @Override
                    public void run()
                    {
                        callback.failed(x);
                    }
                });
            }
            catch(RejectedExecutionException e)
            {
                LOG.debug(e);
                callback.failed(x);
            }
        }
        else
        {
            callback.failed(x);
        }
    }
    
    /**
     * <p>Utility method to be called to register read interest.</p>
     * <p>After a call to this method, {@link #onFillable()} or {@link #onFillInterestedFailed(Throwable)}
     * will be called back as appropriate.</p>
     * @see #onFillable()
     */
    public void fillInterested()
    {
        LOG.debug("fillInterested {}",this);           
        
        while(true)
        {
            State state=_state.get();
            if (next(state,state.fillInterested()))
                break;
        }
    }
    
    public void fillInterested(Callback callback)
    {
        LOG.debug("fillInterested {}",this);

        while(true)
        {
            State state=_state.get();
            // TODO yuck
            if (state instanceof FillingInterestedCallback && ((FillingInterestedCallback)state)._callback==callback)
                break;
            State next=new FillingInterestedCallback(callback,state);
            if (next(state,next))
                break;
        }
    }
    
    /**
     * <p>Callback method invoked when the endpoint is ready to be read.</p>
     * @see #fillInterested()
     */
    public abstract void onFillable();

    /**
     * <p>Callback method invoked when the endpoint failed to be ready to be read.</p>
     * @param cause the exception that caused the failure
     */
    protected void onFillInterestedFailed(Throwable cause)
    {
        LOG.debug("{} onFillInterestedFailed {}", this, cause);
        if (_endPoint.isOpen())
        {
            boolean close = true;
            if (cause instanceof TimeoutException)
                close = onReadTimeout();
            if (close)
            {
                if (_endPoint.isOutputShutdown())
                    _endPoint.close();
                else
                    _endPoint.shutdownOutput();
            }
        }

        if (_endPoint.isOpen())
            fillInterested();        
    }

    /**
     * <p>Callback method invoked when the endpoint failed to be ready to be read after a timeout</p>
     * @return true to signal that the endpoint must be closed, false to keep the endpoint open
     */
    protected boolean onReadTimeout()
    {
        return true;
    }

    @Override
    public void onOpen()
    {
        LOG.debug("onOpen {}", this);

        for (Listener listener : listeners)
            listener.onOpened(this);
    }

    @Override
    public void onClose()
    {
        LOG.debug("onClose {}",this);

        for (Listener listener : listeners)
            listener.onClosed(this);
    }

    @Override
    public EndPoint getEndPoint()
    {
        return _endPoint;
    }

    @Override
    public void close()
    {
        getEndPoint().close();
    }

    @Override
    public int getMessagesIn()
    {
        return -1;
    }

    @Override
    public int getMessagesOut()
    {
        return -1;
    }

    @Override
    public long getBytesIn()
    {
        return -1;
    }

    @Override
    public long getBytesOut()
    {
        return -1;
    }

    @Override
    public long getCreatedTimeStamp()
    {
        return _created;
    }

    @Override
    public String toString()
    {
        return String.format("%s@%x{%s}", getClass().getSimpleName(), hashCode(), _state.get());
    }
    
    public boolean next(State state, State next)
    {
        if (next==null)
            return true;
        if(_state.compareAndSet(state,next))
        {
            LOG.debug("{}-->{} {}",state,next,this);
            if (next!=state)
                next.onEnter(AbstractConnection.this);
            return true;
        }
        return false;
    }
    
    private static final class IdleState extends State
    {
        private IdleState()
        {
            super("IDLE");
        }

        @Override
        State fillInterested()
        {
            return FILL_INTERESTED;
        }
    }


    private static final class FillInterestedState extends State
    {
        private FillInterestedState()
        {
            super("FILL_INTERESTED");
        }

        @Override
        public void onEnter(AbstractConnection connection)
        {
            connection.getEndPoint().fillInterested(connection._readCallback);
        }

        @Override
        State fillInterested()
        {
            return this;
        }

        @Override
        public State onFillable()
        {
            return FILLING;
        }

        @Override
        State onFailed()
        {
            return IDLE;
        }
    }


    private static final class RefillingState extends State
    {
        private RefillingState()
        {
            super("REFILLING");
        }

        @Override
        State fillInterested()
        {
            return FILLING_FILL_INTERESTED;
        }

        @Override
        public State onFilled()
        {
            return IDLE;
        }
    }


    private static final class FillingFillInterestedState extends State
    {
        private FillingFillInterestedState(String name)
        {
            super(name);
        }

        @Override
        State fillInterested()
        {
            return this;
        }

        State onFilled()
        {
            return FILL_INTERESTED;
        }
    }


    private static final class FillingState extends State
    {
        private FillingState()
        {
            super("FILLING");
        }

        @Override
        public void onEnter(AbstractConnection connection)
        {
            if (connection._executeOnfillable)
                connection.getExecutor().execute(connection._runOnFillable);
            else
                connection._runOnFillable.run();
        }

        @Override
        State fillInterested()
        {
            return FILLING_FILL_INTERESTED;
        }

        @Override
        public State onFilled()
        {
            return IDLE;
        }
    }


    public static class State
    {
        private final String _name;
        State(String name)
        {
            _name=name;
        }

        @Override
        public String toString()
        {
            return _name;
        }
        
        void onEnter(AbstractConnection connection)
        {
        }
        
        State fillInterested()
        {
            throw new IllegalStateException(this.toString());
        }

        State onFillable()
        {
            throw new IllegalStateException(this.toString());
        }

        State onFilled()
        {
            throw new IllegalStateException(this.toString());
        }
        
        State onFailed()
        {
            throw new IllegalStateException(this.toString());
        }
    }
    

    public static final State IDLE=new IdleState();
    
    public static final State FILL_INTERESTED=new FillInterestedState();
    
    public static final State FILLING=new FillingState();
    
    public static final State REFILLING=new RefillingState();

    public static final State FILLING_FILL_INTERESTED=new FillingFillInterestedState("FILLING_FILL_INTERESTED");
    
    public class NestedState extends State
    {
        private final State _nested;
        
        NestedState(State nested)
        {
            super("NESTED("+nested+")");
            _nested=nested;
        }
        NestedState(String name,State nested)
        {
            super(name+"("+nested+")");
            _nested=nested;
        }

        @Override
        State fillInterested()
        {
            return new NestedState(_nested.fillInterested());
        }

        @Override
        State onFillable()
        {
            return new NestedState(_nested.onFillable());
        }
        
        @Override
        State onFilled()
        {
            return new NestedState(_nested.onFilled());
        }
    }
    
    
    public class FillingInterestedCallback extends NestedState
    {
        private final Callback _callback;
        
        FillingInterestedCallback(Callback callback,State nested)
        {
            super("FILLING_INTERESTED_CALLBACK",nested==FILLING?REFILLING:nested);
            _callback=callback;
        }

        @Override
        void onEnter(final AbstractConnection connection)
        {
            Callback callback=new Callback()
            {
                @Override
                public void succeeded()
                {
                    while(true)
                    {
                        State state = connection._state.get();
                        if (!(state instanceof NestedState))
                            break;
                        State nested=((NestedState)state)._nested;
                        if (connection.next(state,nested))
                            break;
                    }
                    _callback.succeeded();
                }

                @Override
                public void failed(Throwable x)
                {
                    while(true)
                    {
                        State state = connection._state.get();
                        if (!(state instanceof NestedState))
                            break;
                        State nested=((NestedState)state)._nested;
                        if (connection.next(state,nested))
                            break;
                    }
                    _callback.failed(x);
                }  
            };
            
            connection.getEndPoint().fillInterested(callback);
        }
    }
    
    private final Runnable _runOnFillable = new Runnable()
    {
        @Override
        public void run()
        {
            try
            {
                onFillable();
            }
            finally
            {
                while(true)
                {
                    State state=_state.get();
                    if (next(state,state.onFilled()))
                        break;
                }
            }
        }
    };
    
    
    private class ReadCallback implements Callback
    {   
        @Override
        public void succeeded()
        {
            while(true)
            {
                State state=_state.get();
                if (next(state,state.onFillable()))
                    break;
            }
        }

        @Override
        public void failed(final Throwable x)
        {
            _executor.execute(new Runnable()
            {
                @Override
                public void run()
                {
                    while(true)
                    {
                        State state=_state.get();
                        if (next(state,state.onFailed()))
                            break;
                    }
                    onFillInterestedFailed(x);
                }
            });
        }
        
        @Override
        public String toString()
        {
            return String.format("AC.ReadCB@%x{%s}", AbstractConnection.this.hashCode(),AbstractConnection.this);
        }
    };
}
